基于Docker搭建Hadoop集群

将Hadoop打包到Docker镜像中,就可以快速的在单个机器上搭建Hadoop集群,这样可以方便新手测试和学习。

Hadoop的master和slave分别运行在不同的Docker容器中,其中NameNode、ResourceManager运行在hadoop-master容器中,DataNode、NodeManager运行在hadoop-slave容器中。NameNode和DataNode是Hadoop分布式文件系统HDFS的组件,负责储存输入以及输出数据,而ResourceManager和NodeManager是Hadoop集群资源管理系统YARN的组件,负责CPU和内存资源的调度。

接下来将以Ubuntu16.04为基镜像,介绍搭建Hadoop3.2.0伪分布式集群运行环境的步骤:

A. 3节点Hadoop集群搭建步骤:

1. 下载Docker镜像
1
sudo docker pull lyingbo/hadoop:3.2.0
2. 下载shell脚本
1
git clone https://github.com/lyingbo/hadoop-cluster-docker.git
3. 运行Docker容器
1
2
cd hadoop-cluster-docker
sudo ./start-container.sh

运行结果:

1
2
3
4
start hadoop-master container...
start hadoop-slave1 container...
start hadoop-slave2 container...
root@hadoop-master:/#
  • 启动了3个容器,1个master,2个slave
  • 运行后就进入了hadoop-master容器的/root目录
4. 启动hadoop集群
1
start-all.sh

运行结果:

1
2
3
4
5
6
7
8
Starting namenodes on [hadoop-master]
hadoop-master: Warning: Permanently added 'hadoop-master,172.18.0.2' (ECDSA) to the list of known hosts.
Starting datanodes
hadoop-slave2: Warning: Permanently added 'hadoop-slave2,172.18.0.4' (ECDSA) to the list of known hosts.
hadoop-slave1: Warning: Permanently added 'hadoop-slave1,172.18.0.3' (ECDSA) to the list of known hosts.
Starting secondary namenodes [hadoop-master]
Starting resourcemanager
Starting nodemanagers
5. 运行wordcount示例
1
run-wordcount.sh

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
input file1.txt:
Hello Docker

input file2.txt:
Hello Hadoop

input file3.txt:
Hello MapReduce

wordcount output:
Docker 1
Hadoop 1
Hello 3
MapReduce 1

B. N节点Hadoop集群搭建步骤:

1. 准备
1
参考A部分2.下载shell脚本
2. 重新构建Docker镜像
1
sudo ./resize-cluster.sh 5
  • 可以指定任意N(N>1)
3. 启动Docker容器
1
sudo ./start-container.sh 5
  • 与前一步中的N保持一致。
4. 运行Hadoop
1
参考A部分4~6.启动Hadoop,并运行wordcount。

OpenDDS之Qos策略(3)

OpenDDS主要支持22种Qos,前一篇OpenDDS之Qos策略(2)已对前12种Qos的具体内容作了介绍,接下来介绍下这剩下的10种Qos的具体内容以及用法。

13、TRANSPORT_PRIORITY

TRANSPORT_PRIORITY策略主要应用于主题和数据写者实体,下面是TRANSPORT_PRIORITY QoS策略的IDL:

1
2
3
struct TransportPriorityQosPolicy {
long value;
};

value的默认值是0。该策略用于指定传输层发送消息时采用的优先级,值越大,表示优先级越高。OpenDDS把优先级映射到线程的优先级和DiffServ码点值上。默认值为0表示发送消息时既不修改线程的优先级,也不修改DiffServ的码点值。

OpenDDS支持设置发送线程的传输优先级和接收线程的传输优先级。传输优先级从0到最大值线性地映射0(默认)到最大线程优先级。如果最低传输优先级不是0,也映射为线程的优先级0。如果优先级在系统上出现倒置(数值越大优先级越低),OpenDDS将从0开始的增加的优先级。优先级比最低优先级还小则映射为最低优先级;如果高于线程最大优先级,则映射为最高优先级。

在大部分操作系统中,只有进程调度器设置为允许时,才能修改线程的优先级。进程需要获得系统特权才能执行相应操作,在基于POSIX的系统中,系统调用sched_get_priority_min()和sched_get_priority_max()来检测系统的线程优先级范围。

在传输层支持的前提下,OpenDDS将试图设置数据写者用来发送数据的套接字的DiffServ码点值。如果网络硬件支持码点值,越高的码点将具有越高的优先级,默认值0映射为码点值0(默认的)。优先级值从1到63会映射为相应的码点值,更高的取值都映射为最高值值63。OpenDDS当前版本不支持在创建数据写者之后修改transport_priority策略值。

14、LATENCY_BUDGET

LATENCY_BUDGET策略主要应用于主题,数据读者和数据写者实体,下面是LATENCY_BUDGET QoS策略的IDL:

1
2
3
struct LatencyBudgetQosPolicy {
Duration_t duration;
};

成员duration的默认值是0,表示延迟尽量小。该策略用来指示传输层发送样本的紧急程度。OpenDDS使用该值划分样本从发布者到订阅者之间的传输时间是否为不可接受的延迟间隔。目前,该策略仅适用于监视目的,若需要调节传输延迟可使用TRANSPORT_PRIORITY策略。数据写者仅使用duration做兼容性比较,如果该值为默认的0,所有向它请求的数据读者都认为是兼容的。

为了统计超过duration策略设置的延迟数据,额外增加了一个监听者扩展接口。OpenDDS::DCPS::DataReaderListener额外提供了一个通知操作,当样本接收后测量的传输时延大于LATENCY_BUDGET策略的duration时,将会触发该操作,下面是该操作方法的IDL:

1
2
3
4
5
6
7
8
9
10
struct BudgetExceededStatus {
long total_count;
long total_count_change;
DDS::InstanceHandle_t last_instance_handle;
};

void on_budget_exceeded{
in DDS::DataReader reader,
in BudgetExceededStatus status
};

要使用扩展的Listener回调,首先要从Listener实现中派生一个类,如下面代码段所示:

1
2
3
class DataReaderListenerImpl
: public virtual
OpenDDS::DCPS::LocalObject

接下来需要为on_budget_exceeded()操作提供一个非空的实现。需要注意的是,还需要为下面的扩展操作至少也提供一个空的实现:

1
2
3
4
on_subscription_disconnected()
on_subscription_reconnected()
on_subscription_lost()
on_connection_deleted()

OpenDDS还通过数据读者的扩展接口,提供了延迟统计汇总的功能,该扩展接口在OpenDDS::DCPS模型中,其IDL定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct LatencyStatistics {
GUID_t publication;
unsigned long n;
double maximum;
double minimum;
double mean;
double variance;
};

typedef sequence LatencyStatisticsSeq;

local interface DataReaderEx : DDS::DataReader {

// Obtain a sequence of statistics summaries.
void get_latency_stats( inout LatencyStatisticsSeq stats);

// Clear any intermediate statistical values.
void reset_latency_stats();

// Statistics gathering enable state.
attribute boolean statistics_enabled;

};

为了活的统计信息,需要使用上述扩展接口。也可以通过对数据读者对象的指针类型做转换,然后直接调用相应的接口。参见下面的示例代码,假定reader已经通过调用create_datareader方法正确初始化了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
DDS::DataReader_var reader = DDS::Subscriber::create_datareader();

// ...
// To start collecting new data.
dynamic_cast(reader.in())->reset_latency_stats();
dynamic_cast(reader.in())->statistics_enabled(true);

// ...
// To collect data.
OpenDDS::DCPS::LatencyStatisticsSeq stats;
dynamic_cast(reader.in())->get_latency_stats(stats);

for (unsigned long i = 0; i < stats.length(); ++i) {
std::cout << "stats[" << i << "]:" << std::endl;
std::cout << " n = " << stats[i].n << std::endl;
std::cout << " max = " << stats[i].maximum << std::endl;
std::cout << " min = " << stats[i].minimum << std::endl;
std::cout << " mean = " << stats[i].mean << std::endl;
std::cout << " variance = " << stats[i].variance << std::endl;
}

15、ENTITY_FACTORY

ENTITY_FACTORY策略控制实体创建时,是否可以自动激活,下面是关于ENTITY_FACTORY QoS策略的IDL:

1
2
3
struct EntityFactoryQosPolicy {
boolean autoenable_created_entities;
};

该策略可以应用到像工厂一样为其其它实体服务的实体,并且控制这些被工厂创建的实体是否自动激活。该策略可以应用于域参与者工厂(域参与者的工厂)、域参与者(发布者,订阅者,主题的工厂)、发布者(数据写者的工厂)、订阅者(数据读者的工厂)。该策略的默认值是true,表示工厂实体创建的其它实体可自动激活。如果应用程序希望这些实体可以在需要的时候才激活,可以把该策略的autoenable_created_entities成员的值设为false,然后手动调用实体的enable()操作来手动激活该实体。

该策略的值可以在运行的时候修改,改变将只会影响那些在之后被创建的实体。

16、PRESENTATION

PRESENTATION QoS策略控制当订阅者具有一组读者时数据样本的展示顺序。它将影响实例的变化和一定范围内实例的展示顺序。另外,该策略还引入了连续变化集合的概念,下面是PRESENTATION QoS的IDL:

1
2
3
4
5
6
7
8
9
10
11
enum PresentationQosPolicyAccessScopeKind {
INSTANCE_PRESENTATION_QOS,
TOPIC_PRESENTATION_QOS,
GROUP_PRESENTATION_QOS
};

struct PresentationQosPolicy {
PresentationQosPolicyAccessScopeKind access_scope;
boolean coherent_access;
boolean ordered_access;
};

实例改变的范围指的是在应用程序中可能涉及到的级别,分为以下几个:

  • INSTANCE_PRESENTATION_QOS(默认值)表示实例变化之间是无关的。尽管可以设置coherent_access(连续访问)和ordered_access(顺序访问),但代表相应的操作为空,也就是说,设为这个级别后对订阅者没有影响。
  • TOPIC_PRESENTATION_QOS表示获取的实例变化只限于相同的读者或写者。
  • GROUP_PRESENTATION_QOS表示获取的实例变化限制在相同发布者或订阅者上的所有实例。

coherent_access允许在一个实例上的一个或者多个改变关联合并为一个单个变化。如果数据读者没能完整的接收到所有的改变,那么相当于这些改变都是无效的。这个语义非常类似于传统关系数据库的事件处理。coherent_access的默认值为false。

ordered_access表示发布者发布的数据在读者上按顺序展示。它的效果在本质上和DESTINATION_ORDER策略很相似,不同的是,ordered_access允许数据的顺序与实例顺利无关。ordered_access的默认值为false。

注:该策略影响了订阅者获得样本的顺序和作用域,但是订阅端应用程序必须使用合适的逻辑来读取这些样本以保证请求的行为。

17、DESTINATION_ORDER

DESTINATION_ORDER QoS策略控制数据实例的样本到达数据读者的顺序,如果HISTRORY策略的depth成员变量设为默认值1,实际将影响写者所写的所有最新数据样本,下面是DESTINATION_ORDER Qos的IDL:

1
2
3
4
5
6
7
8
enum DestinationOrderQosPolicyKind {
BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS,
BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS
};

struct DestinationOrderQosPolicy {
DestinationOrderQosPolicyKind kind;
};
  • 值BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS(默认)表示读者接收到的实例的样本按接收端的时间排序。值得注意的是,对于同一个写者发出的数据样本,接收方未必按顺序接收。如果需要强制按顺序接收,另一种方式BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS值将会用到。
  • 值BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS表示数据实例的样本按发出的写者端的时间排序。需要注意的是,如果多个数据写者写相同的实例,必须保证多个写者所在的主机时钟同步。

18、WRITER_DATA_LIFECYCLE

WRITER_DATA_LIFECYCLE QoS策略控制数据写者管理的数据实例集合,下面是WRITER_DATA_LIFECYCLE QoS策略的IDL:

1
2
3
struct WriterDataLifecycleQosPolicy {
boolean autodispose_unregistered_instances;
};

当成员autodispose_unregistered_instances设为true时(默认),数据写者在实例注销时自动清除。在有的情况下,当实例注销时,这是一个不错的实例保护方式。比如,该策略允许EXCLUSIVE(独占)式的数据写者优雅地遵从后续的读者而不影响实例的状态,删除一个数据写者会隐含的注销读者保存的先前的所有样本。

19、READER_DATA_LIFECYCLE

READER_DATA_LIFECYCLE QoS策略控制数据读者管理的数据实例集合,下面是READER_DATA_LIFECYCLE QoS策略的IDL:

1
2
3
4
struct ReaderDataLifecycleQosPolicy {
Duration_t autopurge_nowriter_samples_delay;
Duration_t autopurge_disposed_samples_delay;
};

通常,数据读者保存实例的样本直到与该样本没有任何关联的写者为止,实例已经被处理,实例要么被清除,要么被应用程序读取。

在有些情况下,由于资源的使用问题,需要有合理的约束方式。例如,在故障切换的环境中,该策略允许晚加入的数据写者延长实例的生命周期。

成员autopurge_nowriter_samples_delay可控制读者在资源回收之前所要等待的时间,实例的状态转移为NOT_ALIVE_NO_WRITERS,该成员的默认值为无穷大。

成员autopurge_disposed_samples_delay可控制读者在资源回收之前所要等待的时间,实例的状态转移为NO_ALIVE_DISPOSED,该成员的默认值为无穷大。

20、TIME_BASED_FILTER

TIME_BASED_FILTER QoS策略可以用于指定接收者多长时间接收数据。对于不同的数据样本,不管发布者以多快的速度发布数据,在订阅端,都可以指定数据实例接收更新样本的最小间隔时间,下面是TIME_BASED_FILTER QoS策略的IDL:

1
2
3
struct TimeBasedFilterQosPolicy{
Duration_t minimum_separation;
};

数据读者可以指定一个时间间隔(minimum_separation),该间隔指定了实例更新的的最小时间间隔;它允许数据读者在不影响关联的数据写者状态的前提下,可以修改。默认的minimum_separation是0,表示没有间隔。该QoS策略不会节省带宽,实例值的更新仍然会发送给订阅者进程,它只会影响那些数据读者获取的样本。

21、OWNERSHIP

OWNERSHIP策略控制是否允许多个数据写者为同一个实例发布样本。所有权可以为EXCLUSIVE和SHARED方式,下面是OWNERSHIP QoS策略IDL:

1
2
3
4
5
6
7
8
enum OwnershipQosPolicyKind {
SHARED_OWNERSHIP_QOS,
EXCLUSIVE_OWNERSHIP_QOS
};

struct OwnershipQosPolicy {
OwnershipQosPolicyKind kind;
};

kind成员设为共享方式SHARED_OWNERSHIP_QOS,表示允许多个数据写者更新同一个数据实例。如果kind成员设置为独占式EXCLUSIVE_OWNERSHIP_QOS,表示只允许具有实例所有权的数据写者更新实例。实例的所有者是由OWNERSHIP_STRENGTH策略决定的;OWNERSHIP_STRENGTH策略中成员变量value取值最大的数据写者为实例的所有者。影响所有权还有一个可能的因素,具有最高值的写者是否还存活着,这个由LIVELINESS策略决定。

22、OWNERSHIP_STRENGTH

OWNERSHIP_STRENGTH策略与OWNERSHI策略配合使用,当OWNERSHI的kind设为独占式EXCLUSIVE时有效,下面是OWNERSHIP_STRENGTH QoS策略的IDL:

1
2
3
struct OwnershipStrengthQosPolicy {
long value;
};

value成员变量用来确定数据写者是否是数据实例的所有权者,它的默认值为0。

策略使用示例

下面的示例代码演示了Qos策略在发布端的应用:

1
2
3
4
5
6
7
8
9
10
11
12
DDS::DataWriterQos dw_qos;
pub->get_default_datawriter_qos(dw_qos);

dw_qos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
dw_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
dw_qos.reliability.max_blocking_time.sec = 10;
dw_qos.reliability.max_blocking_time.nanosec = 0;
dw_qos.resource_limits.max_samples_per_instance = 100;

DDS::DataWriter_var dw =
pub->create_datawriter(topic, dw_qos, 0, // No listener
OpenDDS::DCPS::DEFAULT_STATUS_MASK);

上述代码创建了发布端的数据写者,试用了以下几种Qos策略:

  • HISTORY 策略,取值为保持所有;
  • REALIBAILITY 策略,设置可靠类型的最大阻塞时间为10秒;
  • RESOURCE_LIMIT策略,设置每个实例的的最大样本数为100。

以上策略的组合意味着:当100个样本等待传送时,写者在返回错误码前可以阻塞多达10秒。如果相同的策略应用在数据读者上,置意味着:直至有100个未读的样本排队之后,允许有样本被拒收。被拒绝的样本会被丢弃掉,并更新SampleRejectStatus(样本拒收状态)。

有关OpenDDS的相关问题欢迎发送邮件至lyingbo@aliyun.com一起讨论

OpenDDS之Qos策略(2)

OpenDDS主要支持22种Qos,前一篇OpenDDS之Qos策略(1)已对Qos整体、以及默认值做了介绍,接下来介绍这22种Qos的具体内容和用法,由于内容较多,分两篇来完成。

1、LIVELINESS

LIVELINESS策略主要应用于主题,数据写者,数据读者实体中。在主题中设置该策略,意味着对于所有发布订阅该主题的数据写者、数据读者都是有效的,下面是LIVELINESS QoS策略的IDL:

1
2
3
4
5
6
7
8
9
10
enum LivelinessQosPolicyKind {
AUTOMATIC_LIVELINESS_QOS,
MANUAL_BY_PARTICIPANT_LIVELINESS_QOS,
MANUAL_BY_TOPIC_LIVELINESS_QOS
};

struct LivelinessQosPolicy {
LivelinessQosPolicyKind kind;
Duration_t lease_duration;
};

LIVELINESS策略控制服务何时以及如何检测参与者是否还存活,存活表示参与者仍然处于可访问和激活状态。kind成员设置检测方式为自动或是实体手动检测,把kind设为AUTOMATIC_LIVELINESS_QOS,表示服务如果在lease_duration时间周期内没有任何网络流量,则自动的发出表示参与者存活的信息。把kind设为MANUAL_BY_PARTICIPANT_LIVELINESS_QOS或者是MANUAL_BY_TOPIC_LIVELINESS_QOS表示实体要在指定的心跳间隔内写入数据样本或者发送确认在线的消息,心跳间隔时间由lease_duration字段指定。lease_duration的默认值为无穷大,表示禁用存活检测。

手动发布存活消息而不发布数据样本,可以在指定的心跳间隔时间内通过调用数据写者或者域参与者的assert_liveliness()操作来实现。

数据写者指定(提供)自己的心跳数据准则,数据读者指定期望的写者的心跳数据。如果没有在心跳间隔期间内收到写者的数据(既没有样本数据,也没有确认存活的消息),将会触发LIVELINESS_CHANGED_STATUS通信状态改变,并通知应用程序(通过调用数据写者监听者的on_liveliness_changed()回调函数)。

该策略的兼容性是在数据读者和数据写者之间关联关系建立的时候检测的,要建立关联关系,必须保证两端的策略值的兼容性。数据读者需要的策略和数据写者提供的策略的比较值决定了相容性。在决定相容性的时候,两种策略(AUTO、MANUAL)都需要和lease_duration的值相比较。写者提供的策略值必须大于或等于读者需要的策略值。该策略值的排序如下:

1
MANUAL_BY_TOPIC_LIVELINESS_QOS > MANUAL_BY_PARTICIPANT_LIVELINESS_QOS > AUTOMATIC_LIVELINESS_QOS

此外,写者提供的lease_duration必须小于或者等于读者的lease_duration。这些情况都必须符合提供、请求策略的兼容性要求,考虑兼容性和关联关系的建立。

2、RELIABILITY

RELIABILITY策略主要应用于主题,数据读者,数据写者实体中,下面是RELIABILITY QoS策略的IDL:

1
2
3
4
5
6
7
8
9
enum ReliabilityQosPolicyKind {
BEST_EFFORT_RELIABILITY_QOS,
RELIABLE_RELIABILITY_QOS
};

struct ReliabilityQosPolicy {
ReliabilityQosPolicyKind kind;
Duration_t max_blocking_time;
};

该策略控制数据读者处理样本数据的方式。把kind设置为BEST_EFFORT_RELIABILITY_QOS表示数据读写者尽力交付,不保证样本数据的可靠性传输,在某些情况下,允许样本丢弃。把kind设置为RELIABLE_RELIABILITY_QOS表示服务最终把样本传送给合适的数据读者。

当HISTORY QoS策略设置为KEEP_ALL_HISTORY_QOS,并且数据写者在写数据遇到资源限制(因为传输反压,详见6)时,会用到该策略的max_blocking_time字段。当这种情况发生时,数据写者会阻塞,如果阻塞时间超过了max_blocking_time指定的值,再调用写操作就会返回一个写超时的错误码。对于数据读者和主题,该策略的默认值是“BEST_EFFORT,对于数据写者,默认值是RELIABLE。

该策略的兼容性在数据写者和数据读者建立关联关系的时候,就需要考虑。要成功建立关联关系,两边的取值一定要相互兼容。该策略的兼容性要求数据写者的策略值一定要大于或等于数据读者的策略值。

3、HISTORY

HISTORY策略指定数据读者和数据写者保留特定实例样本的数量。对于数据写者,这些样本被保存直到发布者取出它们并成功的发送给所有关联的订阅者;对于数据读者,这些样本被保存直到应用程序取出它们。该策略主要应用于主题、数据写者、数据读者实体,下面是HISTORY QoS策略的IDL:

1
2
3
4
5
6
7
8
9
enum HistoryQosPolicyKind {
KEEP_LAST_HISTORY_QOS,
KEEP_ALL_HISTORY_QOS
};

struct HistoryQosPolicy {
HistoryQosPolicyKind kind;
long depth;
};

取值KEEP_ALL_HISTORY_QOS表示所有的样本都将保存。当指定KEEP_ALL_HISTORY_QOS并且未读取的样本数量等于资源限制策略所设置的max_samples_per_instance时,之后新到来的样本都将被拒收。取值KEEP_LAST_HISTORY_QOS表示只保存depth条最新样本。当数据写者保存的样本条数已经达到depth时,新写入的样本会被排入待发送队列中,而最早写入的样本将会被丢弃。

该策略的默认值是“KEEP_LAST_HISTORY_QOS”,并且depth为1。

4、DURABILITY

DURABILITY策略控制数据写者是否管理已发出的样本。该策略主要应用于主题,数据写者,数据读者实体,下面是DURABILITY QoS策略的IDL:

1
2
3
4
5
6
7
8
9
10
enum DurabilityQosPolicyKind {
VOLATILE_DURABILITY_QOS, // Least Durability
TRANSIENT_LOCAL_DURABILITY_QOS,
TRANSIENT_DURABILITY_QOS,
PERSISTENT_DURABILITY_QOS // Greatest Durability
};

struct DurabilityQosPolicy {
DurabilityQosPolicyKind kind;
};

kind的默认值是VOLATILE_DURABILITY_QOS:

  • 为VOLATILE_DURABILITY_QOS表示样本在发送给已知的订阅者之后即丢弃。因此,订阅者不能找回在它与发布者建立连接之前的任何样本。
  • 为TRANSIENT_LOCAL_DURABILITY_QOS表示数据读者一旦和数据写者建立连接,写者将发送历史中的所有样本。
  • 为TRANSIENT_DURABILITY_QOS表示只要进程没有退出,数据写者的样本就会在内存保留,但不保存在外部存储器中。当同一个域中的数据读者订阅了相同的主题及分区时,写者把所有缓存的样本全部发送出去。
  • 为PERSISTENT_DURABILITY_QOS表示除了提供与TRANSIENT_DURABILITY_QOS相同的功能外,还把样本保留在持久的外部存储器中,即便进程退出,样本仍然可以幸存。

一旦该策略被指定为TRANSIENT或PERSISTENT时,需要设置DURABILITY_SERVICE QoS策略调节持久度缓存的参数。

该策略的兼容性需要在数据写者和数据读者之间建立关联关系的时候考虑。要成功建立关联关系,两边的值一定要兼容。要求数据写者的持久性值一定要大于或等于数据读者的值。持久性大小关系如下:

1
PERSISTENT_DURABILITY_QOS > TRANSIENT_DURABILITY_QOS > TRANSIENT_LOCAL_DURABILITY_QOS > VOLATILE_DURABILITY_QOS

5、DURABILITY_SERVICE

DURABILITY_SERVICE策略控制TRANSIENT 或 PERSISTENT的持久缓存中样本的删除。该策略主要应用于主题,数据写者实体。该策略还提供了一种用于设置HISTORY 和RESOURCE_LIMITS策略关于样本缓冲参数的方式,下面是DURABILITY_SERVICE QoS策略的IDL:

1
2
3
4
5
6
7
8
struct DurabilityServiceQosPolicy {
Duration_t service_cleanup_delay;
HistoryQosPolicyKind history_kind;
long history_depth;
Long max_samples;
long max_instances;
long max_samples_per_instance;
};

该策略中的字段与HISTORY 和RESOURCE_LIMITS中的成员有些相似,但与它们无关。字段service_cleanup_delay可以按需要设定,默认值为0,表示不删除缓存中的样本。

6、RESOURCE_LIMITS

RESOURCE_LIMITS策略指定可以消耗的资源的数量。该策略主要应用于主题,数据写者,数据读者实体,下面是RESOURCE_LIMITS QoS策略的IDL:

1
2
3
4
5
struct ResourceLimitsQosPolicy {
long max_samples;
long max_instances;
long max_samples_per_instance;
};

成员max_samples表示一个数据写者或数据读者可以管理的所有实例的样本总数。成员max_instances表示数据写者或数据读者可以管理的最大实例数。 成员max_samples_per_instance表示一个数据写者或数据读者可以管理的单个实例的最大样本数。以上成员的默认值都是DDS::LENGTH_UNLIMITED,表示无限制。

该策略应用于数据写者时,写者将把因为传输后压还没有发送给数据读者的样本排队发送;该策略应用于数据读者时,读者将把已接收但还未读取/使用的样本缓存在队列中。

7、PARTITION

PARTITION QoS 策略允许在一个域中创建逻辑分区。仅当数据读者和数据写者的分区字符串匹配时,才允许在二者间建立关联关系。该策略主要应用于发布者和订阅者实体,下面是PARTITION QoS策略的IDL:

1
2
3
struct PartitionQosPolicy {
StringSeq name;
};

name成员默认值为空,表示实体参与到默认的分区中。分区名可以包含通配符,通配符的规则与POSIX的fnmatch函数(POSIX 1003.2-1992 section B.6)一致。

数据读者和数据写者的关联关系的建立依赖于发布端和订阅端的分区字符串的匹配。如果分区匹配失败,不会触发任何回调或者设置状态值。

该策略的值可以在运行时修改,此修改会引起关联关系的移除或添加。

8、DEADLINE

DEADLINE QoS策略允许应用程序在指定的时间内检测数据是否被写入或者读取。该策略主要应用于主题、数据写者、数据读者实体,下面是DEADLINE QoS策略的IDL:

1
2
3
struct DeadlineQosPolicy {
Duration_t period;
};

period成员的默认值是无穷大,表示该策略不起作用。如果该策略的值设定为有限值时,数据写者将会监视应用程序的写入行为,当在设定的期限内没有数据写入时,DDS会设置相应的状态条件并触发on_offered_deadline_missed()回调。如果数据读者在设定的期限内没有收到数据时,DDS也会设置相应的状态条件并触发on_offered_deadline_missed()回调。

该策略的兼容性,在数据写者和数据读者建立关联关系的时候就需要考虑。要求:数据读者的period值一定不能小于数据写者的值。

当关联实体被启动之后,该策略的值也是可以修改的。在这种情况下,只有修改后的策略值与关联的实体的策略值一致,才能修改成功。如果该策略用于主题,那么该策略的修改仅仅会影响在改变之后创建的数据读者和数据写者,任何已经创建的读者或者写者,不会受到影响。

9、LIFESPAN

LIFESPAN QoS策略允许应用程序指定一个样本什么时候失效。已经失效的样本不会传送给订阅者。该策略主要应用于主题,数据写者实体,下面是LIFESPAN的QoS策略的IDL:

1
2
3
struct LifespanQosPolicy {
Duration_t duration;
}

duration成员的默认值是无穷大,表示样本永不失效。OpenDDS1.5支持的LIFESPAN检测是当发布者使用DURABILITY kind时有效。当前的OpenDDS实现并不移除那些已经放入数据读者和数据写者缓存之后失效的样本。

该策略的值可以在运行时修改,修改后的策略只影响之后发布的数据。

10、USER_DATA

USER_DATA策略为应用程序提供保存附加信息的方式,该策略主要应用于域参与者、数据读者、数据写者实体,下面是USER_DATA QoS策略的IDL:

1
2
3
struct UserDataQosPolicy {
sequence value;
};

Value的默认值为空,表示没有任何附加信息。该字段可以设为任意次序的8-bit,以把信息附加到已经创建的实体中。USER_DATA策略在各自的内建主题数据中都是可以获得的。远端的应用程序通过内建主题获得信息,并根据自己的目的来使用这些信息。例如,应用程序可以使用USER_DATA策略附加安全凭证来认证信息源。

11、TOPIC_DATA

TOPIC_DATA策略主要应用于主题,下面是TOPIC_DATA QoS策略的IDL:

1
2
3
struct TopicDataQosPolicy {
sequence value;
};

Value的默认值为空,表示没有任何附加信息。该字段可以设为任意次序的8-bit额外信息到主题中。TOPIC_DATA策略对、数据读者,数据写者、主题和内建主题数据有效。远程的应用程序可以通过内建主题获取这些信息,并应用于应用程序自行定义。

12、GROUP_DATA

GROUP_DATA策略主要应用于发布者和订阅者实体,下面是GROUP_DATA QoS策略的IDL:

1
2
3
struct GroupDataQosPolicy {
sequence value;
};

Value的默认值为空,表示没有任何附加信息。该字段可以设为任意次序的8-bit。GROUP_DATA的值通过内建主题传播。发布端数据写者的内置主题包括GROUP_DATA的值,订阅端则是数据读者包括GROUP_DATA的值。GROUP_DATA策略可以用来实现和在7中描述的PARTITION策略相似的机制。

有关OpenDDS的相关问题欢迎发送邮件至lyingbo@aliyun.com一起讨论

OpenDDS之Qos策略(1)

简介

OpenDDS对DDS规范(OMG Document formal/07-01-01)定义的22种QoS策略全部支持。

应用程序通过参与者的QoS约束,来指定需要的行为,然后由DDS的服务决定如何实现这些行为,这些策略应用于所有DDS实体(主题、数据写者、数据读者、发布者、订阅者、域参与者),但不是所有的策略都适用于所有的实体类型。

发布者和订阅者通过RxO模式相匹配,订阅者请求一组策略,发布者提供一组QoS策略给潜在的订阅者,然后DDS试图将请求的策略和提供的策略相匹配,如果这些策略相匹配则将它们关联起来。

在前面的《基于OpenDDS的应用程序开发》博文中,我们采用了默认的QoS策略,接下来的几篇博文将详细介绍每个QoS策略的相关细节。

Qos策略

每个策略都定义了自己的结构,每个实体都支持策略的一个子集,并且定义了一个由被支持的策略结构体所组成的QoS结构体,一个给定实体的可用策略集不会受到QoS结构体中包含的策略结构体的限制,例如,发布者的QoS结构体的IDL定义格式如下:

1
2
3
4
5
6
7
8
module DDS {
struct PublisherQos {
PresentationQosPolicy presentation;
PartitionQosPolicy partition;
GroupDataQosPolicy group_data;
EntityFactoryQosPolicy entity_factory;
};
};

设置一个策略就像获得一个已经有默认值的结构体一样简单,并在必要的时候修改个别策略值,然后将QoS结构体应用到实体中(通常是在实体被建立的时候),后续的章节中将详细说明如何获取各个实体的缺省QoS值。

应用程序可以通过调用set_qos()操作来改变任意实体的QoS策略。如果该QoS策略变得不再兼容,则现有的连接将会移除,否则,如果该Qos策略变得兼容了,将会有新的连接添加。该改变将会通过相应的实体调用QoS更新操作通知DCPSInfoRepo,DCPSInfoRepo将会依据Qos策略规范来重新检测兼容性和关联性,如果兼容性检测失败,set_qos()调用将会返回错误,关联性检测将以移除现有的连接或增加新的连接为结果。

如果试图去改变一个不能改变的QoS策略,set_qos()操作将会返回DDS::RETCODE_IMMUTABLE_POLICY。可以改变的QoS策略有如下几个:USER_DATA,TOPIC_DATA,LIFESPAN,OWNERSHIP_STRENGTH,TIME_BASED_FILTER,ENTITY_FACTORY,WRITER_DATA_LIFECYCLE,READER_DATA_LIFECYCLE,它们不需要重新检测兼容性和关联性。QoS策略DEADLINE和LATENCY_BUDGET需要重新检测相容性,关联性不需要。QoS策略PARTITION恰好相反,只需重新检测关联性,对于兼容性则不需重新检测。

默认Qos策略值

应用程序获取实体的默认QoS策略,通过实例化该实体的一个对应的Qos结构体,然后调用实体工厂的get_default_entity_qos()操作将它获取回来(例如,对于发布者和订阅者,可以通过使用域参与者来获取默认QoS)。

下面的示例代码说明了对于发布者、订阅者、主题、域参与者、数据写者、数据读者如何获取默认QoS:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Get default Publisher QoS from a DomainParticipant:
DDS::PublisherQos pub_qos;
DDS::ReturnCode_t ret;
ret = domain_participant->get_default_publisher_qos(pub_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default publisher QoS" << std::endl;
}

// Get default Subscriber QoS from a DomainParticipant:
DDS::SubscriberQos sub_qos;
ret = domain_participant->get_default_subscriber_qos(sub_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default subscriber QoS" << std::endl;
}

// Get default Topic QoS from a DomainParticipant:
DDS::TopicQos topic_qos;
ret = domain_participant->get_default_topic_qos(topic_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default topic QoS" << std::endl;
}

// Get default DomainParticipant QoS from a DomainParticipantFactory:
DDS::DomainParticipantQos dp_qos;
ret = domain_participant_factory->get_default_participant_qos(dp_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default participant QoS" << std::endl;
}

// Get default DataWriter QoS from a Publisher:
DDS::DataWriterQos dw_qos;
ret = pub->get_default_datawriter_qos(dw_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default data writer QoS" << std::endl;
}

// Get default DataReader QoS from a Subscriber:
DDS::DataReaderQos dr_qos;
ret = pub->get_default_datareader_qos(dr_qos);
if (DDS::RETCODE_OK != ret) {
std::cerr << "Could not get default data reader QoS" << std::endl;
}

下面的表格总结了在OpenDDS中用到的每一种实体类型的默认QoS策略。

表1、默认域参与者QoS策略

策略 成员 默认值
USER_DATA value (not set)
ENTITY_FACTORY autoenable_created_entities true

表2、默认主题QoS策略

策略 成员 默认值
TOPIC_DATA value (not set)
DURABILITY kind VOLATILE_DURABILITY_QOS
DURABILITY_SERVICE service_cleanup_delay.sec DURATION_ZERO_SEC
service_cleanup_delay.nanosec DURATION_ZERO_NSEC
history_kind KEEP_LAST_HISTORY_QOS
history_depth 1
max_samples LENGTH_UNLIMITED
max_instances LENGTH_UNLIMITED
max_samples_per_instance LENGTH_UNLIMITED
DEADLINE period.sec DURATION_INFINITY_SEC
period.nanosec DURATION_INFINITY_NSEC
LATENCY_BUDGET duration.sec DURATION_ZERO_SEC
duration.nanosec DURATION_ZERO_NSEC
LIVELINESS kind AUTOMATIC_LIVELINESS_QOS
lease_duration.sec DURATION_INFINITY_SEC
lease_duration.nanosec DURATION_INFINITY_NSEC
RELIABILITY kind BEST_EFFORT_RELIABILITY_QOS
max_blocking_time.sec DURATION_INFINITY_SEC
max_blocking_time.nanosec DURATION_INFINITY_NSEC
DESTINATION_ORDER kind BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS
HISTORY kind KEEP_LAST_HISTORY_QOS
depth 1
RESOURCE_LIMITS max_samples LENGTH_UNLIMITED
max_instances LENGTH_UNLIMITED
max_samples_per_instance LENGTH_UNLIMITED
TRANSPORT_PRIORITY value 0
LIFESPAN duration.sec DURATION_INFINITY_SEC
duration.nanosec DURATION_INFINITY_NSEC
OWNERSHIP kind SHARED_OWNERSHIP_QOS

表3、默认发布者QoS策略

策略 成员 默认值
PRESENTATION access_scope INSTANCE_PRESENTATION_QOS
coherent_access 0
ordered_access 0
PARTITION name (empty sequence)
GROUP_DATA value (not set)
ENTITY_FACTORY autoenable_created_entities true

表4、默认订阅者QoS策略

策略 成员 默认值
PRESENTATION access_scope INSTANCE_PRESENTATION_QOS
coherent_access 0
ordered_access 0
PARTITION name (empty sequence)
GROUP_DATA value (not set)
ENTITY_FACTORY autoenable_created_entities true

表5、默认数据写者QoS策略

策略 成员 默认值
DESTINATION_ORDER kind BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS
HISTORY kind KEEP_LAST_HISTORY_QOS
depth 1
RESOURCE_LIMITS max_samples LENGTH_UNLIMITED
max_instances LENGTH_UNLIMITED
max_samples_per_instance LENGTH_UNLIMITED
TRANSPORT_PRIORITY value 0
LIFESPAN duration.sec DURATION_INFINITY_SEC
duration.nanosec DURATION_INFINITY_NSEC
USER_DATA value (not set)
OWNERSHIP kind SHARED_OWNERSHIP_QOS
OWNERSHIP_STRENGTH value 0
WRITER_DATA_LIFECYCLE autodispose_unregistered_instances 1
DURABILITY kind VOLATILE_DURABILITY_QOS
DURABILITY_SERVICE service_cleanup_delay.sec DURATION_ZERO_SEC
service_cleanup_delay.nanosec DURATION_ZERO_NSEC
history_kind KEEP_LAST_HISTORY_QOS
history_depth 1
max_samples LENGTH_UNLIMITED
max_instances LENGTH_UNLIMITED
max_samples_per_instance LENGTH_UNLIMITED
DEADLINE period.sec DURATION_INFINITY_SEC
period.nanosec DURATION_INFINITY_NSEC
LATENCY_BUDGET duration.sec DURATION_ZERO_SEC
duration.nanosec DURATION_ZERO_NSEC
LIVELINESS kind AUTOMATIC_LIVELINESS_QOS
lease_duration.sec DURATION_INFINITY_SEC
lease_duration.nanosec DURATION_INFINITY_NSEC
RELIABILITY kind RELIABLE_RELIABILITY_QOS
max_blocking_time.sec 0
max_blocking_time.nanosec 100000000 (100 ms)

表6、默认数据读者QoS策略

策略 成员 默认值
DURABILITY kind VOLATILE_DURABILITY_QOS
DEADLINE period.sec DURATION_INFINITY_SEC
period.nanosec DURATION_INFINITY_NSEC
LATENCY_BUDGET duration.sec DURATION_ZERO_SEC
duration.nanosec DURATION_ZERO_NSEC
LIVELINESS kind AUTOMATIC_LIVELINESS_QOS
lease_duration.sec DURATION_INFINITY_SEC
lease_duration.nanosec DURATION_INFINITY_NSEC
RELIABILITY kind BEST_EFFORT_RELIABILITY_QOS
max_blocking_time.sec DURATION_INFINITY_SEC
max_blocking_time.nanosec DURATION_INFINITY_NSEC
DESTINATION_ORDER kind BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS
HISTORY kind KEEP_LAST_HISTORY_QOS
depth 1
RESOURCE_LIMITS max_samples LENGTH_UNLIMITED
max_instances LENGTH_UNLIMITED
max_samples_per_instance LENGTH_UNLIMITED
USER_DATA value (not set)
OWNERSHIP kind SHARED_OWNERSHIP_QOS
TIME_BASED_FILTER minimum_separation.sec DURATION_ZERO_SEC
minimum_separation.nanosec DURATION_ZERO_NSEC
READER_DATA_LIFECYCLE autopurge_nowriter_samples_delay.sec DURATION_INFINITY_SEC
autopurge_nowriter_samples_delay.nanosec DURATION_INFINITY_NSEC
autopurge_disposed_samples_delay.sec DURATION_INFINITY_SEC
autopurge_disposed_samples_delay.nanosec DURATION_INFINITY_NSEC

有关OpenDDS的相关问题欢迎发送邮件至lyingbo@aliyun.com一起讨论

基于OpenDDS应用程序开发(3)订阅端实现

连续的三篇博文演示如何基于OpenDDS开发应用程序,将数据从发布端节点发送到订阅端节点,该示例程序由一个发布者发布数据,一个订阅者订阅数据,使用默认的QoS策略和TCP/IP传输方式。

本文是第三篇,主要介绍开发一个简单的OpenDDS订阅端应用程序所涉及的步骤。省略一些不重要部分(如:#include部分和异常处理等)代码,只写出关键代码。

1、新建订阅端工程:

参考前一博文中MPC的用法,在Demo.mpc文件中增加如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 project(*Subscriber) : dcpsexe_with_tcp {

exename = subscriber
after += *idl

TypeSupport_Files {
Demo.idl
}

Source_Files {
Subscriber.cpp
DataReaderListenerImpl.cpp
}
}

Subscriber工程从父工程dcpsexe_with_tcp继承,这里直接使用idl工程中定义好的Demo.idl文件。

之后在Demo目录下新建三个文件:Subscriber.cpp、DataReaderListenerImpl.h、DataReaderListenerImpl.cpp,分别用来编写订阅端逻辑部分代码,并再次使用如下命令来生成Vs2008工程:

1
mwc.pl -type vc9

生成完成之后,使用Vs2008打开Demo.sln,就可以修改订阅端代码了:

2、初始化参与者:

初始化订阅端参与者代码同发布端是完全一样的,在Subscriber.cpp文件中增加如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int main(int argc, char *argv[])
{
try {

DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);

DDS::DomainParticipant_var participant =
dpf->create_participant(42, // Domain ID
PARTICIPANT_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (!participant) {
std::cerr << "create_participant failed." << std::endl;
return 1 ;
}

3、注册数据类型并创建主题:

接下来,初始化数据类型和主题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Demo::PosTypeSupport_var mts = new Demo::PosTypeSupportImpl();
if (DDS::RETCODE_OK != mts->register_type(participant, "")) {
std::cerr << "Failed to register the PosTypeSupport." << std::endl;
return 1;
}

CORBA::String_var type_name = mts->get_type_name();
DDS::Topic_var topic =
participant->create_topic("Pos Demo",
type_name,
TOPIC_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (!topic) {
std::cerr << "Failed to create_topic." << std::endl;
return 1;
}

4、创建订阅者:

调用create_subscriber()操作创建一个带有默认QoS策略的订阅者:

1
2
3
4
5
6
7
8
DDS::Subscriber_var sub =
participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (!sub) {
std::cerr << "Failed to create_subscriber." << std::endl;
return 1;
}

5、创建数据读者及监听者:

订阅端需要给数据读者关联一个监听者,用来接收数据的到达,下面的代码定义了一个监听者对象,类DataReaderListenerImpl的实现将在下一部分介绍。

1
DDS::DataReaderListener_var listener(new DataReaderListenerImpl);

现在采用默认的QoS策略创建数据读者,并将它与主题、刚刚创建的监听者对象相关联起来:

1
2
3
4
5
6
7
8
9
DDS::DataReader_var dr =
sub->create_datareader(topic,
DATAREADER_QOS_DEFAULT,
listener,
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (!dr) {
std::cerr << "create_datareader failed." << std::endl;
return 1;
}

之后,主线程就可以自由的去处理其它工作了,当有数据到达时,OpenDDS会调用监听者对象的回调接口通知,只需要在DataReaderListenerImpl类的回调函数中接收需要的数据就可以了。

6、数据读者监听者实现:

监听者类继承自DDS规范的DDS::DataReaderListener接口,该接口定义了一些回调函数,每个回调函数被调用时,就是一个事件的通知,如:断开、重连等,以下是DataReaderListener接口的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
module DDS {

local interface DataReaderListener : Listener {

void on_requested_deadline_missed(in DataReader reader,
in RequestedDeadlineMissedStatus status);

void on_requested_incompatible_qos(in DataReader reader,
in RequestedIncompatibleQosStatus status);

void on_sample_rejected(in DataReader reader,
in SampleRejectedStatus status);

void on_liveliness_changed(in DataReader reader,
in LivelinessChangedStatus status);

void on_data_available(in DataReader reader);

void on_subscription_matched(in DataReader reader,
in SubscriptionMatchedStatus status);

void on_sample_lost(in DataReader reader, in SampleLostStatus status);

};
};

在本例的DataReaderListenerImpl类中真正需要的实现的回调接口是on_data_available(),它也是我们需要重新派生该类的唯一成员函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) {

num_reads_ ++;

try{

Demo::PosDataReader_var reader_i = Demo::PosDataReader::_narrow(reader);
if (!reader_i) {
std::cerr << "read: _narrow failed." << std::endl;
return;
}

Demo::Pos pos;
DDS::SampleInfo si ;
DDS::ReturnCode_t status = reader_i->take_next_sample(pos, si) ;
if (status == DDS::RETCODE_OK) {

if (si.valid_data == 1) {
std::cout << " Pos:pos_id = " << pos. pos_id << std::endl
<< " pos_x = " << pos. pos_x << std::endl
<< " pos_y = " << pos. pos_y << std::endl;
} else if (si.instance_state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) {
std::cout << "instance is disposed" << std::endl;
} else if (si.instance_state == DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) {
std::cout << "instance is unregistered" << std::endl;
} else {
std::cerr << "ERROR: received unknown instance state "
<< si.instance_state << std::endl;
}

} else if (status == DDS::RETCODE_NO_DATA) {
cerr << "ERROR: reader received DDS::RETCODE_NO_DATA!" << std::endl;
} else {
cerr << "ERROR: read Pos: " << status << std::endl;
}

上面的代码将样本从数据读者中取出,如果成功并能返回有效数据,就打印出接收到数据的每一个字段。

每当有样本数据到达时,该函数就会被调用。

7、实体清理:

在订阅完数据以后,需要清理与OpenDDS相关联的资源:

1
2
3
participant->delete_contained_entities();
dpf->delete_participant(participant);
TheServiceParticipant->shutdown();

调用域参与者的delete_contained_entities()操作删除所有该参与者创建的主题、订阅者。一旦执行完该操作,就可以使用域参与者工厂删除域参与者了。

8、示例程序运行:

修改完以上代码并编译完成,就可以运行订阅端应用程序了,需要先运行DDS的信息仓库,开始中打开一个CMD窗口,执行如下命令:

1
%DDS_ROOT%/bin/DCPSInfoRepo  -ORBListenEndpoints  iiop://localhost:12345

再次打开一个CMD窗口,cd到Demo目录下,执行如下命令:

1
subscriber -DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo

至此,订阅端应用程序就开发完成并运行起来了。

有关OpenDDS的相关问题欢迎发送邮件至lyingbo@aliyun.com一起讨论

基于OpenDDS应用程序开发(2)公布端实现

连续的三篇博文演示如何基于OpenDDS开发应用程序,将数据从发布端节点发送到订阅端节点,该示例程序由一个发布者发布数据,一个订阅者订阅数据,使用默认的QoS策略和TCP/IP传输方式。

本文是第二篇,主要介绍开发一个简单的OpenDDS公布端应用程序所涉及的步骤,省略一些不重要部分(如:#include部分和异常处理等)代码,只写出关键代码。

1、新建公布端工程:

参考前一博文中MPC的用法,在Demo.mpc文件中增加如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
project(*Publisher) : dcpsexe_with_tcp {

exename = publisher
after += *idl

TypeSupport_Files {
Demo.idl
}

Source_Files {
Publisher.cpp
}
}

Publisher工程从父工程dcpsexe_with_tcp继承,这里直接使用idl工程中定义好的Demo.idl文件,之后在Demo目录下新建一个Publisher.cpp文件,用来编写公布端逻辑部分代码,并再次使用如下命令来生成Vs2008工程:

1
mwc.pl -type vc9

生成完成之后,使用Vs2008打开Demo.sln就可以修改Publisher.cpp中的代码了:

2、初始化参与者:

main()函数的第一部分是为当前进程初始化一个OpenDDS参与者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int main(int argc, char *argv[]) {

try {

DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);

DDS::DomainParticipant_var participant =
dpf->create_participant(42, // domain ID
PARTICIPANT_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (!participant) {
std::cerr << "create_participant failed." << std::endl;
return 1;
}

调用宏TheParticipantFactoryWithArgs,使用命令行参数初始化参与者工厂,这些命令行参数用来初始化ORB服务。

调用create_participant()操作,使用默认的QoS策略,在域参与者工厂中注册一个域参与者,并指定域ID为42,使用DDS默认的状态掩码,确保所有在中间件中的相关通信状态改变都能传递到应用程序中,域ID可以是在0x0~0x7FFFFFFF范围内的任意值,返回域参与者对象的引用,用来注册待公布的数据类型。

3、注册数据类型并创建主题:

首先,new一个PosTypeSupportImpl对象,然后调用register_type()操作注册一个带有类型名称的类型,示例中,使用空的类型名称,DDS缺省会把PosTypeSupport接口标识符作为该类型的名称,当然,也可以使用像“Pos”这样的特定类型名称。

1
2
3
4
5
 Demo::PosTypeSupport_var mts = new Demo::PosTypeSupportImpl();
if (DDS::RETCODE_OK != mts->register_type(participant, "")) {
std::cerr << "register_type failed." << std::endl;
return 1;
}

接下来,从类型支持对象中获得注册的类型名称,调用create_topic()操作来创建主题。

1
2
3
4
5
6
7
8
9
10
11
12
CORBA::String_var type_name = mts->get_type_name();

DDS::Topic_var topic =
participant->create_topic("Pos Demo",
type_name,
TOPIC_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (!topic) {
std::cerr << "create_topic failed." << std::endl;
return 1;
}

如上代码所示,创建了一个名称为“Pos Demo”,默认主题类型和默认QoS策略的主题,接下来再创建公布者。

4、创建公布者:

调用create_publisher()操作创建一个带有默认公布者QoS策略的公布者。

1
2
3
4
5
6
7
8
DDS::Publisher_var pub =
participant->create_publisher(PUBLISHER_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (!pub) {
std::cerr << "create_publisher failed." << std::endl;
return 1;
}

5、创建数据写者:

有了公布者,再调用create_datawriter()操作创建一个数据写者。

1
2
3
4
5
6
7
8
9
   DDS::DataWriter_var writer =
pub->create_datawriter(topic,
DATAWRITER_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (!writer) {
std::cerr << "create_datawriter failed." << std::endl;
return 1;
}

在创建数据写者的时候,使用已经创建好的主题,默认的QoS 策略和空的监听者。然后将数据写者引用转换为PosDataWriter对象引用,方便使用数据写者类中已经定义好的接口。

1
Demo::PosDataWriter_var pos_writer = Demo::PosDataWriter::_narrow(writer);

6、公布数据:

创建好数据写者,就可以公布数据了,先初始化要公布的对象pos的各个字段,之后调用数据写者的write接口公布数据:

1
2
3
4
5
6
7
8
9
10
11
12
Demo::Pos pos;
pos.pos_id = 99;
pos. pos_x = 99;
pos. pos_y = 99;
for (int i = 0; i < 10; ++i) {
DDS::ReturnCode_t error = pos_writer->write(pos, DDS::HANDLE_NIL);
++pos.pos_id;
if (error != DDS::RETCODE_OK) {
// Log or otherwise handle the error condition
return 1;
}
}

对于每个循环,调用write()操作将样本数据发送给所有注册过该主题的订阅者。

7、等待接收:

由于DDS中的数据公布和数据订阅是解耦的,数据不保证一定交付。如果公布端应用程序要求所有公布的数据必须全部交付,需要在公布端调用数据写者的wait_for_acknowledgements()操作,来使公布端应用程序一直等待,直到订阅端接收到所有已经公布的数据。要使wait_for_acknowledgements()操作有效,数据读者必须设置RELIABILITY QoS策略(是缺省值)为RELIABLE。

数据写者调用此操作,并绑定一个timeout值作为等待的超时时间。如下的代码演示了调用wait_for_acknowledgements()阻塞15s等待订阅端接收所有数据的方法:

1
2
3
4
5
6
7
8
9
 DDS::Duration_t shutdown_delay = {15, 0};

DDS::ReturnCode_t result;
result = writer->wait_for_acknowledgments(shutdown_delay);
if( result != DDS::RETCODE_OK) {
std::cerr << "Failed while waiting for acknowledgment of "
<< "data being received by subscriptions, some data "
<< "may not have been delivered." << std::endl;
}

8、实体清理:

在公布完数据以后,需要清理与OpenDDS相关联的资源:

1
2
3
participant->delete_contained_entities();
dpf->delete_participant(participant);
TheServiceParticipant->shutdown();

调用域参与者的delete_contained_entities()操作删除所有该参与者创建的主题、公布者。一旦执行完该操作,就可以使用域参与者工厂删除域参与者了。

9、示例程序运行:

修改完以上代码并编译完成,就可以运行公布端应用程序了,需要先运行DDS的信息仓库,开始中打开一个CMD窗口,执行如下命令:

1
%DDS_ROOT%/bin/DCPSInfoRepo  -ORBListenEndpoints  iiop://localhost:12345

再次打开一个CMD窗口,cd到Demo目录下,执行如下命令:

1
publisher -DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo

至此,公布端应用程序就开发完成并运行起来了。

有关OpenDDS的相关问题欢迎发送邮件至lyingbo@aliyun.com一起讨论

基于OpenDDS应用程序开发(1)IDL定义

连续的三篇博文演示如何基于OpenDDS开发应用程序,将数据从发布端节点发送到订阅端节点,该示例程序由一个发布者发布数据,一个订阅者订阅数据,使用默认的QoS策略和TCP/IP传输方式。

本文是第一篇,主要介绍IDL的定义及编译。

1、IDL定义:

DDS中的数据类型需要通过IDL格式定义,OpenDDS使用#pragma关键字定义DDS传输和处理的数据类型。
这些数据类型再由tao_idl编译器和OpenDDS_idl编译器进行预编译出来,生成用于网络传输这些数据类型的代码。

下面是一个定义Demo数据类型的IDL文件:

1
2
3
4
5
6
7
8
9
10
11
module Demo {

#pragma DCPS_DATA_TYPE "Demo::Pos"
#pragma DCPS_DATA_KEY "Demo::Pos.pos_id"

struct Pos {
long pos_id;
long pos_x;
long pos_y;
};
};

IDL文件使用DCPS_DATA_TYPE定义一个结构体数据类型,IDL语法要求每个类型定义必须以该关键字开头。OpenDDS规定数据类型必须为一个结构体,结构体中可包含标量类型(short、long、float等)、枚举类型、字符串、队列、数组、结构体、以及它们的组合。本例中在demo模块中定义了一个结构体pos。

使用DCPS_DATA_KEY定义一个数据类型的键,一种数据类型可以有0个或多个键,这些键用于区分同一个主题内的不同实体。每个键必须是数值型、枚举型、字符串或者这些类型的typedef。DCPS_DATA_KEY限定了作用域的数据类型以及成员名,成员名标识为该类型的键,多个键由多个DCPS_DATA_KEY指定。在上面的IDL定义中,指定了pos_id作为Demo::Pos的键。每个样本发布时带有一个唯一的pos_id值,这样就可以区分同一主题中的不同实例。由于使用了默认的QoS策略,随后带有相同pos_id值的样本就会替代先前的样本。

其它类型,如结构体、序列和数组不能直接用来做key,但是当结构体的成员或数组中的元素是数值型、枚举型或者字符串型时可以。

2、IDL编译:

IDL文件首先需要由tao_idl编译,以生成数据在网络上传输时打包解包的代码,该IDL编译器位于$ACE_ROOT/bin/目录下,运行命令如下:

1
tao_idl Demo.idl

该命令为每个IDL文件编译生成6个个文件,这6个文件的文件名均由原IDL文件名开头,如下所示:

1
2
3
4
5
6
<filename>C.cpp
<filename>C.h
<filename>C.inl
<filename>S.cpp
<filename>S.h
<filename>S.inl

进而,IDL文件还需要由opendds_idl编译,以生成OpenDDS需要打包和解包信息的序列化和键支持代码,及用于数据读者和写者类型支持代码,该IDL编译器位于$DDS_ROOT/bin/目录下,运行命令如下:

1
opendds_idl Demo.idl

该命令为每个IDL文件编译生成三个文件,这三个文件的文件名均由原IDL文件名开头,如下所示:

1
2
3
<filename>TypeSupport.idl
<filename>TypeSupportImpl.h
<filename>TypeSupportImpl.cpp

会生成DemoTypeSupport.idl、DemoTypeSupportImpl.h和DemoTypeSupportImpl.cpp三个文件,生成的IDL文件中包括了PosTypeSupport, PosDataWriter 和PosDataReader的接口定义,这些特定的DDS类型接口将会在稍后注册数据类型,发布数据样本和接收数据样本时使用,而生成的cpp文件实现了这些接口。

生成的IDL文件还需要和原始的IDL一起,由tao_idl编译器再次编译,以便生成相应的接口框架,之后这些生成的实现文件需要和使用Pos类型的OpenDDS应用程序源码一起编译链接。

opendds_idl编译器提供了许多选项参数,用于配置代码生成,这些选项参数将在后续章节详细描述。

通常,我们不需要手动直接调用tao_idl或者opendds_idl来编译IDL文件,可以借助编译工具来完成,如果使用ACE的MPC(MakeProjectCreator)工具,通过继承dcps来配置,该过程将会很简单,如下是使用ACE的MPC工具来实现这个工作的步骤:

首先,需要编写一个Demo.mpc文件,在mpc文件中定义一个idl工程,内容如下:

1
2
3
4
5
6
7
8
9
project(*idl): dcps {

// This project ensures the common components get built first.
TypeSupport_Files {
Demo.idl
}

custom_only = 1
}

idl工程从父工程dcps继承,父工程dcps中定义了类型支持的编译规则,TypeSupport_Files部分告诉MPC,使用opendds_idl编译器从Demo.idl中生成类型支持文件。

编写好mpc文件之后,就可以使用MPC命令来生成C/C++的编译配置文件了,(MPC工具可以生成很多种系统的编译工程,包括Windows的Visual Studio系列,Linux的Makefile等),这里将Demo.idl文件和Demo.mpc文件放到Demo文件加下,使用如下命令来生成Vs2008的工程:

1
perl mwc.pl -type vc9

命令执行完成后,会生成对应的Vs2008工程文件,包括vcproj和sln等,使用Vs2008打开Demo.sln就可以编译前面定义的idl文件,生成DemoC.cpp和DemoTypeSupport.idl等文件了。

关于公布端和订阅端应用程序的开发请参见后续博文。

有关OpenDDS的相关问题欢迎发送邮件至lyingbo@aliyun.com一起讨论

OpenDDS环境搭建

以下以Linux系统下的用户DDS为例(关于Windows系统下的搭建过程,跟Linux系统步骤基本一样,请参考文后的评论),编译OpenDDS源码需要ACE+TAO框架支持,因此需要先编译ACE+TAO的相关库,具体步骤如下:

1、下载ACE和DDS源码:

ACE源码:http://www.theaceorb.com/downloads
DDS源码:http://download.ociweb.com/OpenDDS

2、拷贝源码包到/WorkSpace目录:

切换到root用户,在根目下创建WorkSpace目录,为WorkSpace目录给所有用户添加+w权限(chmod +w),再切回到原来的DDS用户,将下载的ACE和DDS源码拷贝到WorkSpace目录下并解压,

1
2
3
4
5
cp ACE+TAO-2.2a.tar.gz /WorkSpace
cp OpenDDS3.5.tar.gz /WorkSpace
cd /WorkSpace
tar –xzvf ACE+TAO-6.1.0.tar.gz
tar –xzvf OpenDDS3.5.tar.gz

3、配置ACE和DDS环境变量:

在/home/DDS/.bashrc文件最后加入下面几行:

1
2
3
4
5
export ACE_ROOT=/WorkSpace/ACE_wrappers
export TAO_ROOT=$ACE_ROOT/TAO
export DDS_ROOT=/WorkSpace/DDS
export PATH=$ACE_ROOT/bin: $DDS_ROOT/bin:$PATH
export LD_LIBRARY_PATH=$ACE_ROOT/lib:$DDS_ROOT/lib:$LD_LIBRARY_PATH

使用source /home/DDS/.bashrc命令或者重新登陆终端使环境变量生效

4、准备系统相关配置文件:

创建文件/WorkSpace/ACE_wrappers/ace/config.h,写入如下内容:

1
#include "ace/config-linux.h"

创建文件/WorkSpace/ACE_wrappers/include/makeinclude/platform_macros.GNU,写入如下内容:

1
2
debug=0
include $(ACE_ROOT)/include/makeinclude/platform_linux.GNU

5、编译ACE和DDS源码:

由于ACE+TAO以及DDS源码中提供了许多测试程序,如果所有源码都编译,将非常耗时,可以只编译ACE+TAO中会被DDS用到的部分,以及DDS的核心部分源码。如此,就需要手动重新生成makefile文件,这里可以借助ACE的MPC工具来完成,先在$DDS_ROOT目录下编写一个DDS_TAO_Simple.mwc文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
workspace {
$(ACE_ROOT)/ace/ace.mpc
$(ACE_ROOT)/apps/gperf/src
$(TAO_ROOT)/TAO_IDL
$(TAO_ROOT)/tao/tao.mpc
$(TAO_ROOT)/tao/AnyTypeCode/AnyTypeCode.mpc
$(TAO_ROOT)/tao/Codeset/Codeset.mpc
$(TAO_ROOT)/tao/IORTable/IORTable.mpc
$(TAO_ROOT)/tao/PortableServer/PortableServer.mpc
$(TAO_ROOT)/tao/IORManipulation/IORManipulation.mpc
$(TAO_ROOT)/tao/ImR_Client/ImR_Client.mpc
$(TAO_ROOT)/tao/PI/PI.mpc
$(TAO_ROOT)/tao/CodecFactory/CodecFactory.mpc
$(TAO_ROOT)/orbsvcs/orbsvcs/Svc_Utils.mpc
dds

exclude {
dds/DCPS/QOS_XML_Handler
DevGuideExamples
tools
java
}
}

之后在$DDS_ROOT目录下使用mwc.pl -type gnuace DDS_TAO_Simple.mwc -include $DDS_ROOT/MPC/config命令就可以生成需要的makefile了。

之后执行(cd $DDS_ROOT; make clean && make) 1> make-dds.log 2>make-dds-err.log命令完成ACE和DDS的编译,如果编译出错,可以查看log文件的编译过程,待脚本运行完成,整个OpenDDS的开发环境就搭建好了,可以用来进行基于OpenDDS的二次开发了。

如果需要运行DDS的测试程序,cd到测试程序源码目录使用make命令就可以编译执行了。

有关OpenDDS的相关问题欢迎发送邮件至lyingbo@aliyun.com一起讨论

ACE组播通信

基于ACE的类ACE_SOCK_Dgram_Mcast封装来实现组播发送和接收的简单示例程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// ACE组播类 CMulticast 实现

#include "ace/OS_NS_unistd.h"
#include "ace/INET_Addr.h"
#include "ace/Truncate.h"
#include "ace/Log_Msg.h"
#include "ace/os_include/os_netdb.h"
#include "ace/SOCK_Dgram_Mcast.h"

// ACE组播 ip
#define DEFAULT_MULTICAST_ADD "239.192.2.3:12345"

// ACE组播 TTL
#define MAX_MULTICAST_IP_TTL 5

class CMulticast
{

public:

CMulticast(const char address[]):
remote_addr_(u_short(0)), multicast_addr_(address)
{
// ACE组播加入组播组
if (-1 == mcast_dgram_.join(multicast_addr_))
{
ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"), "join()"));
}

// ACE组播 设置 TTL
if (-1 == mcast_dgram_.set_option(IP_MULTICAST_TTL, MAX_MULTICAST_IP_TTL))
{
ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"), "set_option()"));
}
}


private:
CMulticast(void){};

public:

ssize_t recv_from_multicast_group(char *buf, size_t n)
{
ssize_t ret = mcast_dgram_.recv(buf, n, remote_addr_);

char hostaddr[MAXHOSTNAME] = {0};
remote_addr_.addr_to_string(hostaddr, MAXHOSTNAME);
cout<<"recve from "<<hostaddr<<" data: "<<buf<<endl;

return ret;
}

ssize_t send_to_multicast_group(const char *buf, size_t n)
{
ssize_t ret = mcast_dgram_.send(buf, n);

char hostaddr[MAXHOSTNAME] = {0};
multicast_addr_.addr_to_string(hostaddr, MAXHOSTNAME);
cout<<"send to "<<hostaddr<<" data: "<<buf<<endl;

return ret;
}

~CMulticast(void)
{
// ACE组播离开组播组
if (-1 == mcast_dgram_.leave(multicast_addr_))
{
ACE_ERROR((LM_ERROR, ACE_TEXT("%p\n"), "leave()"));
}
}

private:

ACE_INET_Addr multicast_addr_; // 组播组地址
ACE_INET_Addr remote_addr_; // 远端地址
ACE_SOCK_Dgram_Mcast mcast_dgram_; //
};


// ACE组播 发送端调用示例

#include "Multicast.h"

#define data[] = "Hello, world!"

int ACE_TMAIN(int argc, ACE_TCHAR *argv[])
{
CMulticast mcast(DEFAULT_MULTICAST_ADD);
while (-1 != mcast.send_to_multicast_group(data, sizeof(data)/sizeof(char)))
{
ACE_OS::sleep(2);
}

ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), "send()"), 1);
}


// ACE组播 接收端调用示例

#include "Multicast.h"

int ACE_TMAIN(int argc, ACE_TCHAR *argv[])
{
CMulticast mcast(DEFAULT_MULTICAST_ADD);
char buf[256] = {0};
while (-1 != mcast.recv_from_multicast_group(buf, 255))
{
ACE_OS::sleep(2);
}

ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\n"), "recv()"), 1);
}

有相关问题欢迎发送邮件至lyingbo@aliyun.com一起讨论

MFC程序添加背景图片的方法

介绍两种为MFC程序的添加背景图片的方法:资源位图的方式和IPicture控件方式

1、资源位图的方式(背景图片只能是bmp格式,双缓存方式绘制防止闪烁)

需要先把以”bmp”为后缀的图片通过插入资源的方式添加到工程中,然后调用下面的函数即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//////////////////////////////////////////////////////////////////////////
//功能:在pDC所在窗口上的DestRect区域内显示资源号bmpID指定的位图
//参数:
// pDC:目的DC
// bmpID:目的位图的资源号
// DestRect:目的区域
//////////////////////////////////////////////////////////////////////////
bool DrawBK(CDC *pDC, UINT bmpID, CRect DestRect)
{
CBitmap bitmap;
bitmap.LoadBitmap(bmpID);

BITMAP BitMap;
bitmap.GetBitmap(&BitMap);

CDC dcMem;
dcMem.CreateCompatibleDC(pDC);
dcMem.SelectObject(&bitmap);

pDC->StretchBlt(DestRect.left,DestRect.top,DestRect.Width(),DestRect.Height(),&dcMem,0,0,BitMap.bmWidth,BitMap.bmHeight,SRCCOPY);

return true;
}

2、借用控件IPicture加载图片的方式(背景图片可以是jpg格式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
//////////////////////////////////////////////////////////////////////////
//功能:在pDC所在窗口上的DestRect区域内显示path指定的图片
//参数:
// pDC:目的DC
// path:目的图片的路径
// DestRect:目的区域
//////////////////////////////////////////////////////////////////////////
bool DrawBK(CDC* pDC,CString path,CRect DestRect)
{
if(path.IsEmpty())
return false;

IStream *pStm;
CFileStatus fstatus;
CFile file;
LONG cb;

if(file.Open(path,CFile::modeRead)&&file.GetStatus(path,fstatus)&&((cb=fstatus.m_size)!=-1))
{
HGLOBAL hGlobal = GlobalAlloc(GMEM_MOVEABLE, cb);
if(hGlobal != NULL)
{
LPVOID pvData = GlobalLock(hGlobal);
if (pvData != NULL)
{
file.ReadHuge(pvData, cb);
GlobalUnlock(hGlobal);
CreateStreamOnHGlobal(hGlobal, TRUE, &pStm);
}
}
}
else
{
return false;
}

//显示图片
IPicture *pPic;
CoInitialize(NULL);

if(SUCCEEDED(OleLoadPicture(pStm,fstatus.m_size,TRUE,IID_IPicture,(LPVOID*)&pPic)))
{
//得到源图像的大小
OLE_XSIZE_HIMETRIC hmWidth;
OLE_YSIZE_HIMETRIC hmHeight;
pPic->get_Width(&hmWidth);
pPic->get_Height(&hmHeight);

//使用render函数显示图片
if(FAILED(pPic->Render(*pDC,DestRect.left,DestRect.top,DestRect.Width(),DestRect.Height(),0,hmHeight,hmWidth,-hmHeight,NULL)))
{
pPic->Release();
return false;
}
pPic->Release();
}
else
{
return false;
}
CoUninitialize();
return true;
}

有关问题欢迎发送邮件至lyingbo@aliyun.com一起讨论